%%HTML
<!--
HTML code to create the sidebar, menu and independent popup window.
Inspiration: https://github.com/vizmotion/jupyter-navigation
-->
<style>
.yourDiv {position: fixed;top: 100px; left: 0px; background: white;height: 100%;width: 150px; padding: 20px; z-index: 10000}
</style>
<script>
function showthis(url) {
window.open(url, "pres", "toolbar=yes,scrollbars=yes,resizable=yes,top=10,left=400,width=500,height=500");
return(false);
}
</script>
<div class=yourDiv>
<a href=#loc_settings>Run Settings</a><br>
<a href=#loc_part1_extraction_pipeline>Part 1: Extraction Pipeline</a><br>
<a href=#loc_classifier_training>Train Classifier</a><br>
<a href=#loc_part2_load_classifier>Part 2: Load classifier</a></br>
<a href=#loc_window_search>Window Search</a><br>
<a href=#loc_tracker>Tracker</a><br>
<a href=#loc_apply_pipeline>Apply pipeline to Video</a><br>
</div>
%%HTML
<a name="loc_settings"></a>
SETTING_RETRAIN_CLASSIFIER = True # Change this to retrain the classifier
SETTING_SAVE_RETRAINED_CLASSIFIER = True # Change this to save the retrained classifier (if retraining)
SETTING_LOAD_TRAINED_CLASSIFIER = True # Change this in case SETTING_SAVE_RETRAINED_CLASSIFIER and you want to use the freshly trained classifier without overwriting the one on disc
SETTING_TEST_CLASSIFIER_ON_TEST_IMAGES = True # Change this to show how the classifier works on test images
# SKLearn
import sklearn
from sklearn import preprocessing
from sklearn.externals import joblib
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import GridSearchCV
from sklearn.svm import LinearSVC, NuSVC
from sklearn import svm
from skimage.feature import hog
from sklearn.cross_validation import train_test_split
from sklearn.metrics import confusion_matrix, classification_report
# MatPlotLib
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
%matplotlib inline
# The rest
import numpy as np
from scipy.ndimage.measurements import label
import cv2
from glob import glob
import random
import csv
import time
import os
from importlib import reload
import multiprocessing as multiprocessing
# Custom code
from common_geometry import Rect, Point
from window_slider import WindowSlider, PartitioningWindowSlider, PartitioningWindowSliderGroup
from visualization_utils import draw_boxes, draw_rects
%%HTML
<a name="loc_part1_extraction_pipeline"></a>
def convert_color(img, color_space='LUV'):
if color_space != 'RGB':
if color_space == 'HSV':
feature_image = cv2.cvtColor(img, cv2.COLOR_RGB2HSV)
elif color_space == 'LUV':
feature_image = cv2.cvtColor(img, cv2.COLOR_RGB2LUV)
elif color_space == 'HLS':
feature_image = cv2.cvtColor(img, cv2.COLOR_RGB2HLS)
elif color_space == 'YUV':
feature_image = cv2.cvtColor(img, cv2.COLOR_RGB2YUV)
elif color_space == 'YCrCb':
feature_image = cv2.cvtColor(img, cv2.COLOR_RGB2YCrCb)
else:
feature_image = np.copy(img)
return feature_image
def get_hog_features(img,
orient,
pix_per_cell,
cell_per_block,
vis=False,
feature_vec=True):
# Call with two outputs if vis==True
if vis == True:
features, hog_image = hog(img,
orientations=orient,
pixels_per_cell=(pix_per_cell, pix_per_cell),
cells_per_block=(cell_per_block, cell_per_block),
transform_sqrt=False,
visualise=vis,
feature_vector=feature_vec,
block_norm='L2'
)
return features, hog_image
# Otherwise call with one output
else:
features = hog(img, orientations=orient,
pixels_per_cell=(pix_per_cell, pix_per_cell),
cells_per_block=(cell_per_block, cell_per_block),
transform_sqrt=False,
visualise=vis, feature_vector=feature_vec,
block_norm='L2')
return features
def bin_spatial(img, size=(32, 32)):
color1 = cv2.resize(img[:,:,0], size, interpolation=cv2.INTER_LINEAR).ravel()
color2 = cv2.resize(img[:,:,1], size, interpolation=cv2.INTER_LINEAR).ravel()
color3 = cv2.resize(img[:,:,2], size, interpolation=cv2.INTER_LINEAR).ravel()
return np.hstack((color1, color2, color3))
def color_hist(img, nbins=32, bins_range=(0, 256)): #
# Compute the histogram of the color channels separately
channel1_hist = np.histogram(img[:,:,0], bins=nbins, range=bins_range)
channel2_hist = np.histogram(img[:,:,1], bins=nbins, range=bins_range)
channel3_hist = np.histogram(img[:,:,2], bins=nbins, range=bins_range)
# Concatenate the histograms into a single feature vector
hist_features = np.concatenate((channel1_hist[0], channel2_hist[0], channel3_hist[0]))
return hist_features
IMG_SHAPE = (64, 64)
class FeatureExtractorConfig:
def __init__(self):
self.color_space = 'LUV' # Can be RGB, HSV, LUV, HLS, YUV, YCrCb
self.spatial_size = (48, 48) # Spatial binning dimensions
self.hist_bins = 64 # Number of histogram bins
self.orient = 12 # HOG orientations
self.pix_per_cell = 8 # HOG pixels per cell
self.cell_per_block = 1 # HOG cells per block
self.hog_channel = 'ALL' # Can be 0, 1, 2, or "ALL"
self.spatial_feat = True # Spatial features on or off
self.hist_feat = True # Histogram features on or off
self.hog_feat = True # HOG features on or off
def print(self):
print('Using: {} orientations; {} pixels per cell; and {} cells per block'.format(
self.orient,
self.pix_per_cell,
self.cell_per_block))
class FeatureExtractor:
def __init__(self, config):
self.config = config
def extract_features(self, img):
# Resize to target spacial size
img_resize = cv2.resize(img, self.config.spatial_size, interpolation=cv2.INTER_LINEAR)
img_resize = (np.sqrt(img_resize.astype(np.float32)/255)*255).astype(np.uint8)
file_features = []
# Color space
feature_image = convert_color(img_resize, color_space=self.config.color_space)
file_features.append(feature_image.ravel())
# Spatial color binning
if self.config.spatial_feat == True:
spatial_features = bin_spatial(feature_image, size=self.config.spatial_size)
file_features.append(spatial_features)
# Histogram color
if self.config.hist_feat == True:
hist_features = color_hist(feature_image, nbins=self.config.hist_bins)
file_features.append(hist_features)
# Histogram of oriented gradients
if self.config.hog_feat == True:
# Call get_hog_features() with vis=False, feature_vec=True
if self.config.hog_channel == 'ALL':
hog_features = []
for channel in range(feature_image.shape[2]):
hog_features.append(get_hog_features(feature_image[:,:,channel],
self.config.orient,
self.config.pix_per_cell,
self.config.cell_per_block)
)
hog_features = np.ravel(hog_features)
else:
hog_features = get_hog_features(feature_image[:,:,self.config.hog_channel],
self.config.orient,
self.config.pix_per_cell,
self.config.cell_per_block)
# Append the new feature vector to the features list
file_features.append(hog_features)
# print(len(file_features), len(spatial_features), len(hist_features), len(hog_features))
return np.concatenate(file_features)
%%HTML
<a name="loc_classifier_training"></a>
class TrainingImagesConfig:
def __init__(self):
self.vehicle_fnames = glob('dataset/vehicles/*/*.png')
self.non_vehicle_fnames = glob('dataset/non-vehicles/*/*.png')
self.non_vehicle_mined_fnames = glob('dataset/non-vehicles-mined/*.png')
self.vehicle_samples = 8790
self.non_vehicle_samples = 8790
class TrainingImageLoader:
def __init__(self, config):
self.config = config
self.is_loaded = False
def load(self):
# Sample from file names. We don't subsample hard mined images deliberately.
vehicle_fnames = random.sample(self.config.vehicle_fnames, self.config.vehicle_samples)
non_vehicle_fnames = random.sample(self.config.non_vehicle_fnames, self.config.non_vehicle_samples)
with multiprocessing.Pool() as pool:
# Load each image into memory
self.car_images = pool.map(mpimg.imread, vehicle_fnames)
self.non_car_images = pool.map(mpimg.imread, non_vehicle_fnames)
self.non_car_mined_images = pool.map(mpimg.imread, self.config.non_vehicle_mined_fnames)
# Convert to 8-bit channels
self.car_images = pool.map(TrainingImageLoader.convert_to_8_bit, self.car_images)
self.non_car_images = pool.map(TrainingImageLoader.convert_to_8_bit, self.non_car_images)
self.non_car_mined_images = pool.map(TrainingImageLoader.convert_to_8_bit, self.non_car_mined_images)
# Hard mined non-car images need to be resized
self.non_car_mined_images = pool.map(TrainingImageLoader.resize_to_expected_shape, self.non_car_mined_images)
# # Load each image into memory
# self.car_images = [mpimg.imread(fname) for fname in vehicle_fnames]
# self.non_car_images = [mpimg.imread(fname) for fname in non_vehicle_fnames]
# self.non_car_mined_images = [mpimg.imread(fname) for fname in self.config.non_vehicle_mined_fnames]
# # Convert to 8-bit channels
# self.car_images = [(image.astype(np.float32)/np.max(image)*255).astype(np.uint8) for image in self.car_images]
# self.non_car_images = [(image.astype(np.float32)/np.max(image)*255).astype(np.uint8) for image in self.non_car_images]
# self.non_car_mined_images = [(image.astype(np.float32)/np.max(image)*255).astype(np.uint8) for image in self.non_car_mined_images]
# # Hard mined non-car images need to be resized
# self.non_car_mined_images = [cv2.resize(image, IMG_SHAPE) for image in self.non_car_mined_images]
self.is_loaded = True
def resize_to_expected_shape(image):
return cv2.resize(image, IMG_SHAPE)
def convert_to_8_bit(image):
return (image.astype(np.float32)/np.max(image)*255).astype(np.uint8)
def is_loaded(self):
return self.is_loaded
if SETTING_RETRAIN_CLASSIFIER == True:
print("Loading images - Starting")
training_images_config = TrainingImagesConfig()
training_img_loader = TrainingImageLoader(training_images_config)
training_img_loader.load()
print("Loading images - Done")
class TrainingAndTestSet:
def __init__(self,
car_images,
non_car_images,
test_size,
feature_extractor):
assert car_images is not None, "Images must be loaded prior to TrainingAndTestSet initialization"
assert non_car_images is not None, "Images must be loaded prior to TrainingAndTestSet initialization"
assert feature_extractor is not None, "Must provide feature extractor"
self.car_images = car_images
self.non_car_images = non_car_images
self.test_size = test_size
self.feature_extractor = feature_extractor
def load(self):
# Extract features
with multiprocessing.Pool() as pool:
car_features = pool.map(self.feature_extractor.extract_features, self.car_images)
non_car_features = pool.map(self.feature_extractor.extract_features, self.non_car_images)
#car_features = [feature_extractor.extract_features(image) for image in car_images]
#non_car_features = [feature_extractor.extract_features(image) for image in non_car_images]
# Create labels
X_cars = np.vstack(car_features)
y_cars = np.ones(X_cars.shape[0], dtype=np.uint8)
X_non_cars = np.vstack(non_car_features)
y_non_cars = np.zeros(X_non_cars.shape[0], dtype=np.uint8)
print('{} cars & {} non-cars labeled'.format(len(X_cars), len(X_non_cars)))
# Join the training vector and labels
X = np.vstack((X_cars, X_non_cars))
y = np.concatenate((y_cars, y_non_cars))
# Normalize the training vectors
# Fit a per-column scaler
self.X_scaler = StandardScaler().fit(X)
# Apply the scaler to X
scaled_X = self.X_scaler.transform(X)
# Split up data into randomized training and test sets
rand_state = np.random.randint(0, 100)
self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(
scaled_X,
y,
test_size=self.test_size,
random_state=rand_state
)
def retrain_classifier():
print("Extracting features and creating test set - Starting")
feature_extractor_config = FeatureExtractorConfig()
feature_extractor_config.print()
feature_extractor = FeatureExtractor(feature_extractor_config)
all_non_car_images = training_img_loader.non_car_images + training_img_loader.non_car_mined_images
training_and_test_set = TrainingAndTestSet(training_img_loader.car_images,
all_non_car_images,
0.2,
feature_extractor
)
training_and_test_set.load()
print("Extracting features and creating test set - Done")
return training_and_test_set
if SETTING_RETRAIN_CLASSIFIER == True:
training_and_test_set = retrain_classifier()
# After trying an (nearly) infite number of kernels via grid search... I settle on the best performing one
#SEARCH_C_VALUES=[0.0001]
#SEARCH_KERNEL_VALUES=['linear']
#SEARCH_GAMMA_VALUES=[0.00001]
#parameters = {'kernel': SEARCH_KERNEL_VALUES, 'C': SEARCH_C_VALUES, 'gamma': SEARCH_GAMMA_VALUES}
#
# def optimize_param(parameters):
# svc = GridSearchCV(svm.SVC(), parameters)
# # Check the training time for the SVC
# t=time.time()
# svc.fit(X_train, y_train)
# t2 = time.time()
# print(round(t2-t, 2), 'Seconds to train SVC...')
# optimal_params = svc.best_params_
# print("Optimal parameters found: ", optimal_params, "\n")
# # Check the score of the SVC
# test_score = round(svc.score(X_test, y_test), 4)
# print('Test Accuracy of SVC = ', test_score)
# # Check the prediction time for a single sample
# t=time.time()
class VehicleClassifier:
def __init__(self, feature_extractor, svca, X_scaler):
self.feature_extractor = feature_extractor
self.svca = svca
self.X_scaler = X_scaler
def is_vehicle(self, image):
resized_img = cv2.resize(image, IMG_SHAPE)
#4) Extract features for that window using single_img_features()
features = self.feature_extractor.extract_features(resized_img)
#5) Scale extracted features to be fed to classifier
scaled_features = classifier.X_scaler.transform(np.array(features).reshape(1, -1))
#6) Predict using your classifier
prediction = classifier.svca.predict(scaled_features)
# Vehicle is class 1
return prediction == 1
class ClassifierTrainer:
def train(training_and_test_set):
print('Feature vector length:', len(training_and_test_set.X_train[0]))
svca = LinearSVC(C=0.0001, dual=True, max_iter=10)
batch_size = len(training_and_test_set.X_train)//1
print('training on {} samples, {} batches, each batch {} samples'.format(
len(training_and_test_set.X_train),
len(training_and_test_set.X_train)//batch_size,
batch_size))
for batch in range(0,len(training_and_test_set.X_train)//batch_size):
t=time.time()
svca.fit(
training_and_test_set.X_train[batch*batch_size:(batch+1)*batch_size],
training_and_test_set.y_train[batch*batch_size:(batch+1)*batch_size]
)
t2 = time.time()
# Check the score of the SVC
test_score = round(svca.score(training_and_test_set.X_test, training_and_test_set.y_test), 4)
print('Batch {}; seconds to train {}; test accuracy {}'.format(batch+1, round(t2-t, 2), test_score))
return VehicleClassifier(training_and_test_set.feature_extractor,
svca,
training_and_test_set.X_scaler)
def print_stats(classifier, training_and_test_set):
predict = classifier.svca.predict(training_and_test_set.X_test)
labels = training_and_test_set.y_test
# print(labels[:20])
# print(predict[:20])
# cm = confusion_matrix(labels, predict)
# tot = cm.sum()
# TN = cm[0][0]/tot
# FP = cm[0][1]/tot
# FN = cm[1][0]/tot
# TP = cm[1][1]/tot
# print("%s %.2f%% %s %.2f%% %s %.2f%% %s %.2f%%\n" % ('TP:',TP, 'FP:',FP, 'TN:',TN, 'FN:',FN))
print(classification_report(labels, predict))
if SETTING_RETRAIN_CLASSIFIER == True:
classifier = ClassifierTrainer.train(training_and_test_set)
print("Training complete. Statistics: ")
ClassifierTrainer.print_stats(classifier, training_and_test_set)
CLASSIFIER_SAVE_PATH = 'intermediates/car_classifier_with_mined.pkl'
SCALER_SAVE_PATH = 'intermediates/car_scaler_with_mined.pkl'
if SETTING_SAVE_RETRAINED_CLASSIFIER == True and SETTING_RETRAIN_CLASSIFIER == True:
joblib.dump(classifier.svca, CLASSIFIER_SAVE_PATH)
joblib.dump(classifier.X_scaler, SCALER_SAVE_PATH)
%%HTML
<a name="loc_part2_load_classifier"></a>
CLASSIFIER_LOAD_PATH = 'intermediates/car_classifier_with_mined.pkl'
SCALER_LOAD_PATH = 'intermediates/car_scaler_with_mined.pkl'
def load_classifier(classifier_path, scaler_path):
svca = None
svca = joblib.load(classifier_path)
if svca != None:
print('Classifier loaded successfully:')
print(svca)
X_scaler = joblib.load(scaler_path)
if X_scaler != None:
print('Scaler loaded successfully:')
print(X_scaler)
# WARNING!!!! Config must match saved classifier config!
# TODO: Persistance of classifier should also save config. Same for loading
feature_extractor_config = FeatureExtractorConfig()
feature_extractor = FeatureExtractor(feature_extractor_config)
classifier = VehicleClassifier(feature_extractor, svca, X_scaler)
return classifier
if SETTING_LOAD_TRAINED_CLASSIFIER == True:
classifier = load_classifier(CLASSIFIER_LOAD_PATH, SCALER_LOAD_PATH)
%%HTML
<a name="loc_window_search"></a>
# WINDOW_SEARCH_RANGES = [
# # {'window_size': (256,256), 'x_start_stop': [None, None], 'y_start_stop': [360, 740], 'xy_overlap':(0.8, 0.8)},
# {'window_size': (192,192), 'x_start_stop': [None, None], 'y_start_stop': [360, 740], 'xy_overlap':(0.8, 0.8)},
# {'window_size': (128,128), 'x_start_stop': [50, 1300], 'y_start_stop': [360, 600], 'xy_overlap':(0.8, 0.8)},
# {'window_size': (64,64), 'x_start_stop': [200, 1200], 'y_start_stop': [360, 500], 'xy_overlap':(0.85, 0.85)},
# ]
WINDOW_SEARCH_RANGES = [
# {'window_size': (256,256), 'x_start_stop': [None, None], 'y_start_stop': [360, 740], 'xy_overlap':(0.7, 0.7)},
{'window_size': (192,192), 'x_start_stop': [0, 1280], 'y_start_stop': [360, 740], 'xy_overlap':(0.80, 0.90)},
{'window_size': (160,160), 'x_start_stop': [0, 1280], 'y_start_stop': [300, 740], 'xy_overlap':(0.80, 0.90)},
{'window_size': (128,128), 'x_start_stop': [0, 1280], 'y_start_stop': [350, 700], 'xy_overlap':(0.80, 0.90)},
{'window_size': (96,96), 'x_start_stop': [200, 1200], 'y_start_stop': [400, 600], 'xy_overlap':(0.80, 0.90)},
{'window_size': (64,64), 'x_start_stop': [200, 1200], 'y_start_stop': [400, 600], 'xy_overlap':(0.70, 0.80)},
]
# def get_all_windows(image):
# all_windows = []
# for search_range in WINDOW_SEARCH_RANGES:
# windows = slide_window(xy_window=search_range['window_size'],
# x_start_stop=search_range['x_start_stop'],
# y_start_stop=search_range['y_start_stop'],
# xy_overlap=search_range['xy_overlap'])
# all_windows.extend(windows)
# return all_windows
class ImageWindowSearch:
def __init__(self, image, classifier):
self.image = image
self.classifier = classifier
def search_windows(self, windows, process_pool):
# Search windows in parallel
window_check_results = process_pool.map(self.check_window, windows)
# window_check_results = map(self.check_window, windows)
# Filter out the windows with no car in them
positive_windows = [check_result[1] for check_result in window_check_results if check_result[0] == True]
# Return positively identified car windows
return positive_windows
def check_window(self, window):
# Extract the test window from original image
if window[1][1] < 0 or window[1][0] < 0 or window[0][1] < 0 or window[0][0] < 0:
return (False, window)
window_img = self.image[window[0][1]:window[1][1], window[0][0]:window[1][0]]
if window_img.shape[0] == 0 or window_img.shape[1] == 0:
return (False, window)
# Classify
is_vehicle = self.classifier.is_vehicle(window_img)
return (is_vehicle, window)
def visualize_search_area():
# Load an image
files = glob('test_images/test*.jpg')
image = mpimg.imread(files[0])
# # Get all windows (get_all_windows)
# all_windows = get_all_windows(image)
# Draw the boxes (draw_boxes)
# image_with_all_boxes = draw_boxes(image, all_windows)
# Plot
f, axes_arr = plt.subplots(len(WINDOW_SEARCH_RANGES), figsize=(24, 30))
f.tight_layout()
for idx, ax in enumerate(axes_arr):
search_range = WINDOW_SEARCH_RANGES[idx]
slider = PartitioningWindowSlider(
x_start_stop=search_range['x_start_stop'],
y_start_stop=search_range['y_start_stop'],
xy_window=search_range['window_size'],
xy_overlap=search_range['xy_overlap'],
n_partitions=1)
windows = slider.get_all_windows()
image_with_bboxes = draw_boxes(image, windows)
image_with_bboxes = draw_boxes(image_with_bboxes, [windows[0]], (255, 0, 255), thick=4)
ax.set_title("Search range {}".format(search_range))
ax.imshow(image_with_bboxes)
plt.subplots_adjust(left=0., right=1, top=0.9, bottom=0.1, hspace=0.2)
plt.show()
visualize_search_area()
def visualize_partitioned_search_area(n_frames, n_examples_skip, n_examples_show):
# Load an image
files = glob('test_images/test*.jpg')
image = mpimg.imread(files[0])
sliders = []
for search_range in WINDOW_SEARCH_RANGES:
slider = PartitioningWindowSlider(
x_start_stop=search_range['x_start_stop'],
y_start_stop=search_range['y_start_stop'],
xy_window=search_range['window_size'],
xy_overlap=search_range['xy_overlap'],
n_partitions=n_frames)
sliders.append(slider)
# # Get all windows (get_all_windows)
# all_windows = get_all_windows(image)
# # Partition
# partitioned_windows = partition_search_windows(all_windows, n_partitions=16)
# Plot
f, axes_arr = plt.subplots(n_examples_show, figsize=(24, 30))
f.tight_layout()
for idx, ax in enumerate(axes_arr):
partition_bboxes = []
for slider in sliders:
windows = slider.get_partition(idx + n_examples_skip)
partition_bboxes.extend(windows)
image_with_bboxes = draw_boxes(image, partition_bboxes)
ax.set_title("Partition id {}".format(idx + n_examples_skip))
ax.imshow(image_with_bboxes)
plt.subplots_adjust(left=0., right=1, top=0.9, bottom=0.1, hspace=0.2)
plt.show()
visualize_partitioned_search_area(25, 20, 5)
DETECTION_THRESHOLD = 6
def test_image_detection(image):
draw_image = np.copy(image)
# Prepare the sliding windows
windows = []
for search_range in WINDOW_SEARCH_RANGES:
slider = WindowSlider(
x_start_stop=search_range['x_start_stop'],
y_start_stop=search_range['y_start_stop'],
xy_window=search_range['window_size'],
xy_overlap=search_range['xy_overlap'])
windows.extend(slider.get_all_windows())
window_search = ImageWindowSearch(image, classifier)
with multiprocessing.Pool(int(multiprocessing.cpu_count()/2)) as process_pool:
hot_windows = window_search.search_windows(windows, process_pool)
all_windows_img = draw_boxes(draw_image, windows)
positive_labeled_windows = draw_boxes(draw_image, hot_windows)
heatmap = WindowSlider.windows_to_heatmap(image.shape[0:2], hot_windows)
heatmap_orig = np.copy(heatmap)
# Threshold
heatmap[heatmap <= DETECTION_THRESHOLD] = 0
# Label separate detections
labels = label(heatmap)
# Calculate bounding boxes for labels
detected_bboxes = PartitioningWindowSlider.get_labeled_bboxes(labels)
# Draw bounding boxes on image
if detected_bboxes is not None:
img_with_bboxes = draw_boxes(image, detected_bboxes)
else:
img_with_bboxes = image
return (all_windows_img, positive_labeled_windows, heatmap_orig, heatmap, labels, img_with_bboxes)
# files = glob('test_images/problem_image.png')
# image = mpimg.imread(files[0])
# %prun test_image_detection(image)
# SETTING_TEST_CLASSIFIER_ON_TEST_IMAGES = alse
if SETTING_TEST_CLASSIFIER_ON_TEST_IMAGES:
files = glob('test_images/test*.jpg')
for file in files:
image = mpimg.imread(file)
(all_windows_img, positive_labeled_windows, heatmap_orig, heatmap, labels, img_with_bboxes) = test_image_detection(image)
# Plot
f, ((ax1, ax2, ax3), (ax4, ax5, ax6)) = plt.subplots(2, 3, figsize=(24, 10))
f.tight_layout()
ax1.set_title('Scanned Windows', fontsize=30)
ax1.imshow(all_windows_img)
ax2.set_title('Positive Scans', fontsize=30)
ax2.imshow(positive_labeled_windows)
ax3.set_title('Heatmap before threshold', fontsize=30)
ax3.imshow(heatmap_orig, cmap='gray')
ax4.set_title('Heatmap after threshold', fontsize=30)
ax4.imshow(heatmap, cmap='hot')
ax5.set_title('Labels', fontsize=30)
ax5.imshow(labels[0], cmap='gray')
ax6.set_title('Final Detections', fontsize=30)
ax6.imshow(img_with_bboxes)
plt.subplots_adjust(left=0., right=1, top=0.9, bottom=0.1, hspace=0.2)
plt.show()
TEST_VIDEO_INPUT_PATH = 'test_video.mp4'
TEST_VIDEO_OUTPUT_PATH = 'output_images/test_video_output.mov'
PROJECT_VIDEO_INPUT_PATH = 'project_video.mp4'
PROJECT_VIDEO_OUTPUT_PATH = 'output_images/project_video_output.mov'
PROJECT_HEATMAP_OUTPUT_PATH = 'output_images/project_video_heatmap.mov'
PROJECT_WINDOWS_OUTPUT_PATH = 'output_images/project_video_windows.mov'
SECONDS_TO_FULL_SCAN = 3
OVERLAP_THRESHOLD = 0.05
MIN_DETECTIONS_FOR_QUALITY_TRACKER = 5
MIN_DETECTIONS_FOR_TRACKER = 4
TRACKER_OVERLAP_HORIZ = 0.95
TRACKER_OVERLAP_VERT = 0.98
TRACKER_STEPS_PER_DIRECTION = 3
TRACKER_SCALEDOWN = 0.98
%%HTML
<a name="loc_tracker"></a>
class VehicleTracker():
def __init__(self,
classifier,
stable_id,
initial_frame,
initial_rect,
process_pool,
detection_threshold=0,
loss_threshold=0,
tracker_type="KCF"
):
self.detect_rects_history = []
self.classifier = classifier
self.stable_id = stable_id
self.tracker_type = tracker_type
self.detect_rects_history.append(initial_rect)
self.has_been_updated = False
self.detections = 1
self.losses = 0
self.loss_threshold = loss_threshold
self.detection_threshold = detection_threshold
self.process_pool = process_pool
self.reinit_internal_tracker(initial_frame, initial_rect)
def reinit_internal_tracker(self, frame, rect):
self.internal_tracker = cv2.Tracker_create(self.tracker_type)
self.internal_tracker.init(frame,
VehicleTracker.rect_to_tracker_bbox(rect))
def update(self, frame):
detection = None
# First try with an internal tracker
ok, bbox = self.internal_tracker.update(frame)
if ok:
# Tracker succeeded, validate with classifier
detection_rect = VehicleTracker.tracker_bbox_to_rect(bbox)
is_vehicle = self.classify_detection(frame, detection_rect)
if is_vehicle:
detection = detection_rect
# If internal tracker failed to detect, or classifier disqualified the detection, perform nearby search
if detection is None:
# Tracker failed, we need to perform a search and reinitialize the tracker
detection = self.perform_nearby_search(frame)
# If nearby search failed, we give up
if detection == None:
return None
else:
# If we did find a new detection by proximity search, reinitialize the internal tracker
self.reinit_internal_tracker(frame, detection)
# Append to history and return
self.detect_rects_history.append(detection)
self.detections += 1
self.has_been_updated = True
return detection
def perform_nearby_search(self, frame):
# Search around previous detection
last_detection_rect = self.detect_rects_history[-1]
candidate_rects = VehicleTracker.get_candidates_for_nearby_search(last_detection_rect)
candidate_search_windows = []
for rect in candidate_rects:
classify_bbox = ((rect.p1.x, rect.p1.y), (rect.p2.x, rect.p2.y))
candidate_search_windows.append(classify_bbox)
detections = detect_cars_in_frame(frame,
candidate_search_windows,
self.classifier,
self.process_pool
)
# Classifier approved
valid_rects = []
for detection in detections[0]:
# print("Detection: ", detection)
bbox = detection
# print("BBOX: : ", bbox)
rect = Rect(
Point(bbox[0][0], bbox[0][1]),
Point(bbox[1][0], bbox[1][1]))
if rect.get_height() == 0 or rect.get_width() == 0:
continue
valid_rects.append(rect)
# If not valid matches found, then this tracker must have been lost
if len(valid_rects) == 0:
self.losses += 1
if self.get_is_lost_or_rejected():
return None
else:
return last_detection_rect
# Choose closest matching rect to 3 previous detections
# latest_detections = self.detect_rects_history[-3:]
# total_rect_area = sum(rect.calculate_area() for rect in latest_detections) #, lambda rect: rect.calculate_area())/len(valid_rects)
# avg_rect_area = total_rect_area / len(latest_detections)
last_detection_area = self.detect_rects_history[-1].calculate_area()
total_rect_area = sum(rect.calculate_area() for rect in valid_rects) #, lambda rect: rect.calculate_area())/len(valid_rects)
avg_rect_area = total_rect_area / len(valid_rects)
target_rect_area = (last_detection_area + avg_rect_area)/2
final_rect = min(valid_rects, key = lambda rect: (rect.calculate_area() - target_rect_area)**2)
return final_rect
def classify_detection(self, frame, detection_rect):
detection_img = frame[
detection_rect.get_top() : detection_rect.get_bottom(),
detection_rect.get_left() : detection_rect.get_right()
]
is_vehicle = classifier.is_vehicle(detection_img)
return is_vehicle
def get_candidates_for_nearby_search(rect):
candidate_rects = []
for step in range(1, TRACKER_STEPS_PER_DIRECTION):
dx = rect.get_width()*(1-TRACKER_OVERLAP_HORIZ) * step
dy = rect.get_width()*(1-TRACKER_OVERLAP_VERT) * step
original = rect
left = rect.shift(-dx, 0)
right = rect.shift(dx, 0)
above = rect.shift(0, -dy)
below = rect.shift(0, dy)
left_above = rect.shift(-dx, -dy)
right_above = rect.shift(dx, -dy)
left_below = rect.shift(-dx, dy)
right_below = rect.shift(dx, dy)
scale_up = Rect(rect.p2.shift(-dx, -dy), rect.p2.shift(dx, dy))
candidate_rects.extend([original, left, right, above, below,
left_above, right_above, left_below, right_below,
scale_up])
try:
# Since scale down the way we do it might make the rectangle of negative dimension, it may raise
scale_down = Rect(rect.p2.shift(dx, dy), rect.p2.shift(-dx, -dy))
candidate_rects.append(scale_down)
except:
pass
return candidate_rects
def rect_to_tracker_bbox(rect):
tracker_bbox = (rect.get_left(),
rect.get_top(),
rect.get_width(),
rect.get_height())
return tracker_bbox
def tracker_bbox_to_rect(tracker_bbox):
# We perform saniziation here, to avoid coordinates below zero
tracker_bbox = list(tracker_bbox) # Convert to list for writeability
for i in range(0,4):
if tracker_bbox[i] < 0:
tracker_bbox[i] = 0
return Rect(
Point(tracker_bbox[0], tracker_bbox[1]),
Point(tracker_bbox[0]+tracker_bbox[2], tracker_bbox[1]+tracker_bbox[3])
)
# def readjust_area(self, frame, rect):
# self.tracker = cv2.Tracker_create("KCF")
# tracker_bbox = (rect.get_left(),
# rect.get_top(),
# rect.get_width(),
# rect.get_height())
# self.tracker.init(frame, tracker_bbox)
def get_stable_id(self):
return self.stable_id
def get_is_lost_or_rejected(self):
return self.losses > self.loss_threshold
def get_is_past_detection_threshold(self):
return self.detections > self.detection_threshold
def get_latest_detection(self):
return self.detect_rects_history[-1]
def get_detected_rects_history(self):
return self.detect_rects_history
def get_has_been_updated(self):
return self.has_been_updated
def visualize_tracker_search_area():
# Load an image
files = glob('test_images/test*.jpg')
image = mpimg.imread(files[0])
# Generate some fake matches
matching_rects = []
matching_rects.append(Rect(
Point(100, 100),
Point(164, 164)
))
matching_rects.append(Rect(
Point(200, 200),
Point(328, 328)
))
matching_rects.append(Rect(
Point(400, 400),
Point(564, 564)
))
# Plot
f, axes_arr = plt.subplots(len(matching_rects), figsize=(24, 30))
f.tight_layout()
for idx, ax in enumerate(axes_arr):
matching_rect = matching_rects[idx]
candidate_rects = VehicleTracker.get_candidates_for_nearby_search(matching_rect)
image_with_bboxes = draw_rects(image, candidate_rects, thick=2)
image_with_bboxes = draw_rects(image_with_bboxes, [matching_rect], (255, 0, 255), thick=2)
ax.set_title("Search range for match at {}".format(matching_rect))
ax.imshow(image_with_bboxes)
plt.subplots_adjust(left=0., right=1, top=0.9, bottom=0.1, hspace=0.2)
plt.show()
visualize_tracker_search_area()
%%HTML
<a name="loc_apply_pipeline"></a>
<h1>Apply pipeline to video</h1>
class PerformanceTracker:
def __init__(self):
self.frame_processing_times = []
def append_processing_time(self, time_span):
self.frame_processing_times.append(time_span)
def print_stats(self):
# Print statistics
if len(self.frame_processing_times) == 0:
return
total_processing_time = sum(self.frame_processing_times)
avg_processing_time = total_processing_time / len(self.frame_processing_times)
max_processing_time = max(self.frame_processing_times)
print('Processing times: total - {} average - {} max - {} '.format(
total_processing_time,
avg_processing_time,
max_processing_time
))
def detect_cars_in_frame(image,
windows,
classifier,
process_pool):
window_search = ImageWindowSearch(image, classifier)
hot_windows = window_search.search_windows(windows, process_pool)
heatmap = WindowSlider.windows_to_heatmap(image.shape[0:2], hot_windows)
# Threshold
heatmap[heatmap <= DETECTION_THRESHOLD] = 0
# Label separate detections
labels = label(heatmap)
# Calculate bounding boxes for labels
detected_bboxes = PartitioningWindowSlider.get_labeled_bboxes(labels)
return detected_bboxes, heatmap
# class VideoProcessorConfig:
# # TODO: Static state
# # TODO: Frame boundaries
# # TODO: Thresholds
# pass
class FrameProcessorConfig:
def __init__(self,
classifier,
frame_size,
video_fps,
seconds_to_full_scan,
window_search_ranges,
output_heatmap_frame,
output_windows_frame
):
self.classifier = classifier
self.frame_size = frame_size
# Partition sliding windows
self.n_partitions = int(video_fps*seconds_to_full_scan)
self.slider_group = PartitioningWindowSliderGroup(window_search_ranges, self.n_partitions)
self.output_heatmap_frame = output_heatmap_frame
self.output_windows_frame = output_windows_frame
class State:
def __init__(self):
self.frame_idx = 0
self.previous_batch_detections = None
self.trackers = []
self.historical_lost_trackers = []
self.vehicles_counter = 0
class FrameProcessor:
def __init__(self, config):
self.config = config
self.process_pool = multiprocessing.Pool(int(multiprocessing.cpu_count()/2))
def process_frame(self,
frame,
state):
# Get the bboxes to scan in this frame
partition_idx = state.frame_idx % self.config.n_partitions
scan_windows = self.config.slider_group.get_windows_for_partition(partition_idx)
frame_detections, heatmap_frame = detect_cars_in_frame(
frame,
scan_windows,
self.config.classifier,
self.process_pool
)
print("Frame {} detections: {}".format(state.frame_idx, len(frame_detections)))
for detection in frame_detections:
rect = Rect(
Point(detection[0][0], detection[0][1]),
Point(detection[1][0], detection[1][1]))
if rect.get_height() == 0 or rect.get_width() == 0:
continue
current_detection_area = rect.calculate_area()
# Check if there is overlap with an existing tracker
overlapping_tracker_found = False
for tracker in state.trackers:
if tracker.get_is_lost_or_rejected() == True:
continue
latest_detection = tracker.get_latest_detection()
overlap = latest_detection.calculate_overlap(rect)
if overlap == None:
continue
overlap_area = overlap.calculate_area()
if overlap_area/current_detection_area >= OVERLAP_THRESHOLD:
# Found an overlapping tracker, no need to record this detection
# tracker.readjust_area(frame, rect)
overlapping_tracker_found = True
break
elif overlap_area/latest_detection.calculate_area() >= OVERLAP_THRESHOLD:
# Overlap is large in comparison to previous detected area
# TODO: Consider switching to the larger area
# tracker.readjust_area(frame, rect)
overlapping_tracker_found = True
break
# No need to create a new tracker if the full scan window overlaps with an existing tracker
if overlapping_tracker_found == True:
continue
state.vehicles_counter += 1
tracker = VehicleTracker(self.config.classifier,
state.vehicles_counter,
frame,
rect,
self.process_pool)
state.trackers.append(tracker)
# We count on the trackers to provide us with detections
# We validate detections against our own classifier
# If the classifier rejects the detection,
frame_detections = []
for tracker in state.trackers:
rect = tracker.update(frame)
if rect is not None:
bbox = ((rect.p1.x, rect.p1.y), (rect.p2.x, rect.p2.y))
# Only treat tracker seriously after it gained MIN_DETECTIONS_FOR_TRACKER
# Assume that the tracker is gaining first detections
if tracker.get_is_past_detection_threshold():
frame_detections.append(bbox)
# else:
# print("Tracker {} LOST or REJECTED".format(tracker.get_stable_id()))
# Move lost trackers to historical_lost_trackers
lost_trackers = [tracker for tracker in state.trackers if tracker.get_is_lost_or_rejected()]
state.historical_lost_trackers.extend(lost_trackers)
state.trackers = [tracker for tracker in state.trackers if not tracker.get_is_lost_or_rejected()]
# TODO: Go over each tracker, if there are more than 2 recent detections, draw the ID on the car
# TODO: For lost trackers which had enough detections, we want to run nearby classifier search, to attempt a reacquire
# Until then, we just run a full search on next frame if a high quality tracker was just lost
quality_lost_trackers = [tracker for tracker in lost_trackers if len(tracker.get_detected_rects_history()) >= MIN_DETECTIONS_FOR_QUALITY_TRACKER ]
if len(quality_lost_trackers) > 0:
state.too_many_lost_trackers = True
out_frame = draw_boxes(frame, frame_detections)
if self.config.output_heatmap_frame and heatmap_frame is not None:
# Convert to uint and add depth
heatmap_frame = np.uint8(heatmap_frame)*10
heatmap_frame = np.dstack((heatmap_frame, heatmap_frame, heatmap_frame))
else:
heatmap_frame = None
if self.config.output_windows_frame:
windows_frame = draw_boxes(frame, scan_windows)
else:
windows_frame = None
return out_frame, heatmap_frame, windows_frame
# TODO:
# 1. (2d) velocity and (2d) direction calculation. Can later be used for 3d calculation by transform to horizon
def process_video(video_input_path,
video_output_path,
heatmap_output_path=None,
windows_output_path=None,
frame_start=0,
frame_end=None):
# Get info on the input video
video = cv2.VideoCapture(video_input_path)
video_fps = video.get(cv2.CAP_PROP_FPS)
frame_size = (int(video.get(cv2.CAP_PROP_FRAME_WIDTH)), int(video.get(cv2.CAP_PROP_FRAME_HEIGHT)))
# Prepare video files
# NOTE: Overwrites existing files if present
if os.path.exists(video_output_path):
os.remove(video_output_path)
fourcc = cv2.VideoWriter_fourcc(*'avc1')
video_out = cv2.VideoWriter(video_output_path,
fourcc,
fps=video_fps,
frameSize=frame_size,
isColor=True)
heatmap_out = None
if heatmap_output_path is not None:
if os.path.exists(heatmap_output_path):
os.remove(heatmap_output_path)
heatmap_fourcc = cv2.VideoWriter_fourcc(*'avc1')
heatmap_out = cv2.VideoWriter(heatmap_output_path,
heatmap_fourcc,
fps=video_fps,
frameSize=frame_size,
isColor=True)
windows_out = None
if windows_output_path is not None:
if os.path.exists(windows_output_path):
os.remove(windows_output_path)
windows_fourcc = cv2.VideoWriter_fourcc(*'avc1')
windows_out = cv2.VideoWriter(windows_output_path,
windows_fourcc,
fps=video_fps,
frameSize=frame_size,
isColor=True)
# Some state we manage in the process
frame_processor_config = FrameProcessorConfig(
classifier,
frame_size,
video_fps,
seconds_to_full_scan = SECONDS_TO_FULL_SCAN,
window_search_ranges = WINDOW_SEARCH_RANGES,
output_heatmap_frame = (heatmap_out != None),
output_windows_frame = (windows_out != None)
)
frame_processor = FrameProcessor(frame_processor_config)
performance_tracker = PerformanceTracker()
state = State()
has_reached_end = False
while(video.isOpened() and not has_reached_end):
time_op_start = time.time()
# Read a frame, make sure we're not at the end
ret, frame = video.read()
if ret == False:
has_reached_end = True
break
state.frame_idx+=1
if state.frame_idx < frame_start or (frame_end is not None and state.frame_idx > frame_end):
continue
# Process a frame
out_frame, heatmap_frame, windows_frame = frame_processor.process_frame(frame, state)
# Measure performance
# The search vast majority of time (86%) is lost to Python's awful locking situation:
# 244512 function calls in 201.634 seconds
# 1757 174.521 0.099 174.521 0.099 {method 'acquire' of '_thread.lock' objects}
time_op_end = time.time()
performance_tracker.append_processing_time(time_op_end - time_op_start)
if state.frame_idx % 10 == 0:
print("Processed frame ", state.frame_idx)
# Write video frames
video_out.write(out_frame)
if heatmap_frame is not None:
heatmap_out.write(heatmap_frame)
if windows_frame is not None:
windows_out.write(windows_frame)
# Print statistics
performance_tracker.print_stats()
print("Releasing everything")
video.release()
video_out.release()
if heatmap_out:
heatmap_out.release()
if windows_out:
windows_out.release()
# cv2.destroyAllWindows() ro
def process_full_project_video():
process_video(PROJECT_VIDEO_INPUT_PATH,
PROJECT_VIDEO_OUTPUT_PATH,
# heatmap_output_path = PROJECT_HEATMAP_OUTPUT_PATH,
# windows_output_path = PROJECT_WINDOWS_OUTPUT_PATH,
heatmap_output_path = None,
windows_output_path = None,
frame_start=0,
frame_end=None)
def process_test_video():
process_video(TEST_VIDEO_INPUT_PATH, TEST_VIDEO_OUTPUT_PATH)
def process_part_project_video():
process_video(PROJECT_VIDEO_INPUT_PATH,
PROJECT_VIDEO_OUTPUT_PATH,
# heatmap_output_path = PROJECT_HEATMAP_OUTPUT_PATH,
# windows_output_path = PROJECT_WINDOWS_OUTPUT_PATH,
heatmap_output_path = None,
windows_output_path = None,
frame_start = 200,
frame_end = 400)
# %prun process_part_project_video()
process_full_project_video()